/* replacement start */
const process = require("../../../../process");
/* replacement end */

const bufferModule = require("../../../../buffer");

const {
  isReadable,
  isWritable,
  isIterable,
  isNodeStream,
  isReadableNodeStream,
  isWritableNodeStream,
  isDuplexNodeStream,
} = require("./utils");

const eos = require("./end-of-stream");

const {
  AbortError,
  codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE },
} = require("../../ours/errors");

const { destroyer } = require("./destroy");

const Duplex = require("./duplex");

const Readable = require("./readable");

const { createDeferredPromise } = require("../../ours/util");

const from = require("./from");

const Blob = globalThis.Blob || bufferModule.Blob;
const isBlob =
  typeof Blob !== "undefined"
    ? function isBlob(b) {
        return b instanceof Blob;
      }
    : function isBlob(b) {
        return false;
      };

const AbortController =
  globalThis.AbortController ||
  require("../../../plugins/abort-controller").AbortController;

const { FunctionPrototypeCall } = require("../../ours/primordials"); // This is needed for pre node 17.

class Duplexify extends Duplex {
  constructor(options) {
    super(options); // https://github.com/nodejs/node/pull/34385

    if (
      (options === null || options === undefined
        ? undefined
        : options.readable) === false
    ) {
      this._readableState.readable = false;
      this._readableState.ended = true;
      this._readableState.endEmitted = true;
    }

    if (
      (options === null || options === undefined
        ? undefined
        : options.writable) === false
    ) {
      this._writableState.writable = false;
      this._writableState.ending = true;
      this._writableState.ended = true;
      this._writableState.finished = true;
    }
  }
}

module.exports = function duplexify(body, name) {
  if (isDuplexNodeStream(body)) {
    return body;
  }

  if (isReadableNodeStream(body)) {
    return _duplexify({
      readable: body,
    });
  }

  if (isWritableNodeStream(body)) {
    return _duplexify({
      writable: body,
    });
  }

  if (isNodeStream(body)) {
    return _duplexify({
      writable: false,
      readable: false,
    });
  } // TODO: Webstreams
  // if (isReadableStream(body)) {
  //   return _duplexify({ readable: Readable.fromWeb(body) });
  // }
  // TODO: Webstreams
  // if (isWritableStream(body)) {
  //   return _duplexify({ writable: Writable.fromWeb(body) });
  // }

  if (typeof body === "function") {
    const { value, write, final, destroy } = fromAsyncGen(body);

    if (isIterable(value)) {
      return from(Duplexify, value, {
        // TODO (ronag): highWaterMark?
        objectMode: true,
        write,
        final,
        destroy,
      });
    }

    const then = value === null || value === undefined ? undefined : value.then;

    if (typeof then === "function") {
      let d;
      const promise = FunctionPrototypeCall(
        then,
        value,
        (val) => {
          if (val != null) {
            throw new ERR_INVALID_RETURN_VALUE("nully", "body", val);
          }
        },
        (err) => {
          destroyer(d, err);
        }
      );
      return (d = new Duplexify({
        // TODO (ronag): highWaterMark?
        objectMode: true,
        readable: false,
        write,

        final(cb) {
          final(async () => {
            try {
              await promise;
              process.nextTick(cb, null);
            } catch (err) {
              process.nextTick(cb, err);
            }
          });
        },

        destroy,
      }));
    }

    throw new ERR_INVALID_RETURN_VALUE(
      "Iterable, AsyncIterable or AsyncFunction",
      name,
      value
    );
  }

  if (isBlob(body)) {
    return duplexify(body.arrayBuffer());
  }

  if (isIterable(body)) {
    return from(Duplexify, body, {
      // TODO (ronag): highWaterMark?
      objectMode: true,
      writable: false,
    });
  } // TODO: Webstreams.
  // if (
  //   isReadableStream(body?.readable) &&
  //   isWritableStream(body?.writable)
  // ) {
  //   return Duplexify.fromWeb(body);
  // }

  if (
    typeof (body === null || body === undefined ? undefined : body.writable) ===
      "object" ||
    typeof (body === null || body === undefined ? undefined : body.readable) ===
      "object"
  ) {
    const readable =
      body !== null && body !== undefined && body.readable
        ? isReadableNodeStream(
            body === null || body === undefined ? undefined : body.readable
          )
          ? body === null || body === undefined
            ? undefined
            : body.readable
          : duplexify(body.readable)
        : undefined;
    const writable =
      body !== null && body !== undefined && body.writable
        ? isWritableNodeStream(
            body === null || body === undefined ? undefined : body.writable
          )
          ? body === null || body === undefined
            ? undefined
            : body.writable
          : duplexify(body.writable)
        : undefined;
    return _duplexify({
      readable,
      writable,
    });
  }

  const then = body === null || body === undefined ? undefined : body.then;

  if (typeof then === "function") {
    let d;
    FunctionPrototypeCall(
      then,
      body,
      (val) => {
        if (val != null) {
          d.push(val);
        }

        d.push(null);
      },
      (err) => {
        destroyer(d, err);
      }
    );
    return (d = new Duplexify({
      objectMode: true,
      writable: false,

      read() {},
    }));
  }

  throw new ERR_INVALID_ARG_TYPE(
    name,
    [
      "Blob",
      "ReadableStream",
      "WritableStream",
      "Stream",
      "Iterable",
      "AsyncIterable",
      "Function",
      "{ readable, writable } pair",
      "Promise",
    ],
    body
  );
};

function fromAsyncGen(fn) {
  let { promise, resolve } = createDeferredPromise();
  const ac = new AbortController();
  const signal = ac.signal;
  const value = fn(
    (async function* () {
      while (true) {
        const _promise = promise;
        promise = null;
        const { chunk, done, cb } = await _promise;
        process.nextTick(cb);
        if (done) return;
        if (signal.aborted)
          throw new AbortError(undefined, {
            cause: signal.reason,
          });
        ({ promise, resolve } = createDeferredPromise());
        yield chunk;
      }
    })(),
    {
      signal,
    }
  );
  return {
    value,

    write(chunk, encoding, cb) {
      const _resolve = resolve;
      resolve = null;

      _resolve({
        chunk,
        done: false,
        cb,
      });
    },

    final(cb) {
      const _resolve = resolve;
      resolve = null;

      _resolve({
        done: true,
        cb,
      });
    },

    destroy(err, cb) {
      ac.abort();
      cb(err);
    },
  };
}

function _duplexify(pair) {
  const r =
    pair.readable && typeof pair.readable.read !== "function"
      ? Readable.wrap(pair.readable)
      : pair.readable;
  const w = pair.writable;
  let readable = !!isReadable(r);
  let writable = !!isWritable(w);
  let ondrain;
  let onfinish;
  let onreadable;
  let onclose;
  let d;

  function onfinished(err) {
    const cb = onclose;
    onclose = null;

    if (cb) {
      cb(err);
    } else if (err) {
      d.destroy(err);
    } else if (!readable && !writable) {
      d.destroy();
    }
  } // TODO(ronag): Avoid double buffering.
  // Implement Writable/Readable/Duplex traits.
  // See, https://github.com/nodejs/node/pull/33515.

  d = new Duplexify({
    // TODO (ronag): highWaterMark?
    readableObjectMode: !!(
      r !== null &&
      r !== undefined &&
      r.readableObjectMode
    ),
    writableObjectMode: !!(
      w !== null &&
      w !== undefined &&
      w.writableObjectMode
    ),
    readable,
    writable,
  });

  if (writable) {
    eos(w, (err) => {
      writable = false;

      if (err) {
        destroyer(r, err);
      }

      onfinished(err);
    });

    d._write = function (chunk, encoding, callback) {
      if (w.write(chunk, encoding)) {
        callback();
      } else {
        ondrain = callback;
      }
    };

    d._final = function (callback) {
      w.end();
      onfinish = callback;
    };

    w.on("drain", function () {
      if (ondrain) {
        const cb = ondrain;
        ondrain = null;
        cb();
      }
    });
    w.on("finish", function () {
      if (onfinish) {
        const cb = onfinish;
        onfinish = null;
        cb();
      }
    });
  }

  if (readable) {
    eos(r, (err) => {
      readable = false;

      if (err) {
        destroyer(r, err);
      }

      onfinished(err);
    });
    r.on("readable", function () {
      if (onreadable) {
        const cb = onreadable;
        onreadable = null;
        cb();
      }
    });
    r.on("end", function () {
      d.push(null);
    });

    d._read = function () {
      while (true) {
        const buf = r.read();

        if (buf === null) {
          onreadable = d._read;
          return;
        }

        if (!d.push(buf)) {
          return;
        }
      }
    };
  }

  d._destroy = function (err, callback) {
    if (!err && onclose !== null) {
      err = new AbortError();
    }

    onreadable = null;
    ondrain = null;
    onfinish = null;

    if (onclose === null) {
      callback(err);
    } else {
      onclose = callback;
      destroyer(w, err);
      destroyer(r, err);
    }
  };

  return d;
}
